package promql

import (
	"flag"
	"fmt"
	"math"
	"regexp"
	"sort"
	"strings"
	"sync"
	"sync/atomic"
	"time"
	"unsafe"

	"github.com/VictoriaMetrics/metrics"
	"github.com/VictoriaMetrics/metricsql"

	"github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/netstorage"
	"github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/searchutil"
	"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
	"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
	"github.com/VictoriaMetrics/VictoriaMetrics/lib/decimal"
	"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
	"github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil"
	"github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver"
	"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
	"github.com/VictoriaMetrics/VictoriaMetrics/lib/memory"
	"github.com/VictoriaMetrics/VictoriaMetrics/lib/querytracer"
	"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
	"github.com/VictoriaMetrics/VictoriaMetrics/lib/stringsutil"
)

var (
	disableCache = flag.Bool("search.disableCache", false, "Whether to disable response caching. This may be useful when ingesting historical data. "+
		"See https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/#backfilling . See also -search.resetRollupResultCacheOnStartup")
	maxPointsSubqueryPerTimeseries = flag.Int("search.maxPointsSubqueryPerTimeseries", 100e3, "The maximum number of points per series, which can be generated by subquery. "+
		"See https://valyala.medium.com/prometheus-subqueries-in-victoriametrics-9b1492b720b3")
	maxMemoryPerQuery = flagutil.NewBytes("search.maxMemoryPerQuery", 0, "The maximum amounts of memory a single query may consume. "+
		"Queries requiring more memory are rejected. The total memory limit for concurrently executed queries can be estimated "+
		"as -search.maxMemoryPerQuery multiplied by -search.maxConcurrentRequests . "+
		"See also -search.logQueryMemoryUsage")
	logQueryMemoryUsage = flagutil.NewBytes("search.logQueryMemoryUsage", 0, "Log query and increment vm_memory_intensive_queries_total metric each time "+
		"the query requires more memory than specified by this flag. "+
		"This may help detecting and optimizing heavy queries. Query logging is disabled by default. "+
		"See also -search.logSlowQueryDuration and -search.maxMemoryPerQuery")
	noStaleMarkers = flag.Bool("search.noStaleMarkers", false, "Set this flag to true if the database doesn't contain Prometheus stale markers, "+
		"so there is no need in spending additional CPU time on its handling. Staleness markers may exist only in data obtained from Prometheus scrape targets")
	minWindowForInstantRollupOptimization = flag.Duration("search.minWindowForInstantRollupOptimization", time.Hour*3, "Enable cache-based optimization for repeated queries "+
		"to /api/v1/query (aka instant queries), which contain rollup functions with lookbehind window exceeding the given value")
	maxBinaryOpPushdownLabelValues = flag.Int("search.maxBinaryOpPushdownLabelValues", 100, "The maximum number of values for a label in the first expression that can be extracted as a common label filter and pushed down to the second expression in a binary operation. "+
		"A larger value makes the pushed-down filter more complex but fewer time series will be returned. This flag is useful when selective label contains numerous values, for example `instance`, and storage resources are abundant.")
)

// The minimum number of points per timeseries for enabling time rounding.
// This improves cache hit ratio for frequently requested queries over
// big time ranges.
const minTimeseriesPointsForTimeRounding = 50

// ValidateMaxPointsPerSeries validates that the number of points for the given start, end and step do not exceed maxPoints.
func ValidateMaxPointsPerSeries(start, end, step int64, maxPoints int) error {
	if step == 0 {
		return fmt.Errorf("step can't be equal to zero")
	}
	points := (end-start)/step + 1
	if points > int64(maxPoints) {
		return fmt.Errorf("too many points for the given start=%d, end=%d and step=%d: %d; the maximum number of points is %d",
			start, end, step, points, maxPoints)
	}
	return nil
}

// AdjustStartEnd adjusts start and end values, so response caching may be enabled.
//
// See EvalConfig.mayCache() for details.
func AdjustStartEnd(start, end, step int64) (int64, int64) {
	if *disableCache {
		// Do not adjust start and end values when cache is disabled.
		// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/563
		return start, end
	}
	points := (end-start)/step + 1
	if points < minTimeseriesPointsForTimeRounding {
		// Too small number of points for rounding.
		return start, end
	}

	// Round start and end to values divisible by step in order
	// to enable response caching (see EvalConfig.mayCache).
	start, end = alignStartEnd(start, end, step)

	// Make sure that the new number of points is the same as the initial number of points.
	newPoints := (end-start)/step + 1
	for newPoints > points {
		end -= step
		newPoints--
	}

	return start, end
}

func alignStartEnd(start, end, step int64) (int64, int64) {
	// Round start to the nearest smaller value divisible by step.
	start -= start % step
	// Round end to the nearest bigger value divisible by step.
	adjust := end % step
	if adjust > 0 {
		end += step - adjust
	}
	return start, end
}

// EvalConfig is the configuration required for query evaluation via Exec
type EvalConfig struct {
	Start int64
	End   int64
	Step  int64

	// MaxSeries is the maximum number of time series, which can be scanned by the query.
	// Zero means 'no limit'
	MaxSeries int

	// MaxPointsPerSeries is the limit on the number of points, which can be generated per each returned time series.
	MaxPointsPerSeries int

	// QuotedRemoteAddr contains quoted remote address.
	QuotedRemoteAddr string

	Deadline searchutil.Deadline

	// Whether the response can be cached.
	MayCache bool

	// LookbackDelta is analog to `-query.lookback-delta` from Prometheus.
	LookbackDelta int64

	// How many decimal digits after the point to leave in response.
	RoundDigits int

	// EnforcedTagFilterss may contain additional label filters to use in the query.
	EnforcedTagFilterss [][]storage.TagFilter

	// The callback, which returns the request URI during logging.
	// The request URI isn't stored here because its' construction may take non-trivial amounts of CPU.
	GetRequestURI func() string

	// QueryStats contains various stats for the currently executed query.
	//
	// The caller must initialize QueryStats, otherwise it isn't collected.
	QueryStats *QueryStats

	timestamps     []int64
	timestampsOnce sync.Once
}

// copyEvalConfig returns src copy.
func copyEvalConfig(src *EvalConfig) *EvalConfig {
	var ec EvalConfig
	ec.Start = src.Start
	ec.End = src.End
	ec.Step = src.Step
	ec.MaxSeries = src.MaxSeries
	ec.MaxPointsPerSeries = src.MaxPointsPerSeries
	ec.Deadline = src.Deadline
	ec.MayCache = src.MayCache
	ec.LookbackDelta = src.LookbackDelta
	ec.RoundDigits = src.RoundDigits
	ec.EnforcedTagFilterss = src.EnforcedTagFilterss
	ec.GetRequestURI = src.GetRequestURI
	ec.QueryStats = src.QueryStats

	// do not copy src.timestamps - they must be generated again.
	return &ec
}

func (ec *EvalConfig) validate() {
	if ec.Start > ec.End {
		logger.Panicf("BUG: start cannot exceed end; got %d vs %d", ec.Start, ec.End)
	}
	if ec.Step <= 0 {
		logger.Panicf("BUG: step must be greater than 0; got %d", ec.Step)
	}
}

func (ec *EvalConfig) mayCache() bool {
	if *disableCache {
		return false
	}
	if !ec.MayCache {
		return false
	}
	if ec.Start == ec.End {
		// There is no need in aligning start and end to step for instant query
		// in order to cache its results.
		return true
	}
	if ec.Start%ec.Step != 0 {
		return false
	}
	if ec.End%ec.Step != 0 {
		return false
	}
	return true
}

func (ec *EvalConfig) timeRangeString() string {
	start := storage.TimestampToHumanReadableFormat(ec.Start)
	end := storage.TimestampToHumanReadableFormat(ec.End)
	return fmt.Sprintf("[%s..%s]", start, end)
}

func (ec *EvalConfig) getSharedTimestamps() []int64 {
	ec.timestampsOnce.Do(ec.timestampsInit)
	return ec.timestamps
}

func (ec *EvalConfig) timestampsInit() {
	ec.timestamps = getTimestamps(ec.Start, ec.End, ec.Step, ec.MaxPointsPerSeries)
}

func getTimestamps(start, end, step int64, maxPointsPerSeries int) []int64 {
	// Sanity checks.
	if step <= 0 {
		logger.Panicf("BUG: Step must be bigger than 0; got %d", step)
	}
	if start > end {
		logger.Panicf("BUG: Start cannot exceed End; got %d vs %d", start, end)
	}
	if err := ValidateMaxPointsPerSeries(start, end, step, maxPointsPerSeries); err != nil {
		logger.Panicf("BUG: %s; this must be validated before the call to getTimestamps", err)
	}

	// Prepare timestamps.
	points := 1 + (end-start)/step
	timestamps := make([]int64, points)
	for i := range timestamps {
		timestamps[i] = start
		start += step
	}
	return timestamps
}

func evalExpr(qt *querytracer.Tracer, ec *EvalConfig, e metricsql.Expr) ([]*timeseries, error) {
	if qt.Enabled() {
		query := string(e.AppendString(nil))
		query = stringsutil.LimitStringLen(query, 300)
		mayCache := ec.mayCache()
		qt = qt.NewChild("eval: query=%s, timeRange=%s, step=%d, mayCache=%v", query, ec.timeRangeString(), ec.Step, mayCache)
	}
	rv, err := evalExprInternal(qt, ec, e)
	if err != nil {
		return nil, err
	}
	if qt.Enabled() {
		seriesCount := len(rv)
		pointsPerSeries := 0
		if len(rv) > 0 {
			pointsPerSeries = len(rv[0].Timestamps)
		}
		pointsCount := seriesCount * pointsPerSeries
		qt.Donef("series=%d, points=%d, pointsPerSeries=%d", seriesCount, pointsCount, pointsPerSeries)
	}
	return rv, nil
}

func evalExprInternal(qt *querytracer.Tracer, ec *EvalConfig, e metricsql.Expr) ([]*timeseries, error) {
	if me, ok := e.(*metricsql.MetricExpr); ok {
		re := &metricsql.RollupExpr{
			Expr: me,
		}
		rv, err := evalRollupFunc(qt, ec, "default_rollup", rollupDefault, e, re, nil)
		if err != nil {
			return nil, fmt.Errorf(`cannot evaluate %q: %w`, me.AppendString(nil), err)
		}
		return rv, nil
	}
	if re, ok := e.(*metricsql.RollupExpr); ok {
		rv, err := evalRollupFunc(qt, ec, "default_rollup", rollupDefault, e, re, nil)
		if err != nil {
			return nil, fmt.Errorf(`cannot evaluate %q: %w`, re.AppendString(nil), err)
		}
		return rv, nil
	}
	if fe, ok := e.(*metricsql.FuncExpr); ok {
		nrf := getRollupFunc(fe.Name)
		if nrf == nil {
			qtChild := qt.NewChild("transform %s()", fe.Name)
			rv, err := evalTransformFunc(qtChild, ec, fe)
			qtChild.Donef("series=%d", len(rv))
			return rv, err
		}
		args, re, err := evalRollupFuncArgs(qt, ec, fe)
		if err != nil {
			return nil, err
		}
		rf, err := nrf(args)
		if err != nil {
			return nil, fmt.Errorf("cannot evaluate args for %q: %w", fe.AppendString(nil), err)
		}
		rv, err := evalRollupFunc(qt, ec, fe.Name, rf, e, re, nil)
		if err != nil {
			return nil, fmt.Errorf(`cannot evaluate %q: %w`, fe.AppendString(nil), err)
		}
		return rv, nil
	}
	if ae, ok := e.(*metricsql.AggrFuncExpr); ok {
		qtChild := qt.NewChild("aggregate %s()", ae.Name)
		rv, err := evalAggrFunc(qtChild, ec, ae)
		qtChild.Donef("series=%d", len(rv))
		return rv, err
	}
	if be, ok := e.(*metricsql.BinaryOpExpr); ok {
		qtChild := qt.NewChild("binary op %q", be.Op)
		rv, err := evalBinaryOp(qtChild, ec, be)
		qtChild.Donef("series=%d", len(rv))
		return rv, err
	}
	if ne, ok := e.(*metricsql.NumberExpr); ok {
		rv := evalNumber(ec, ne.N)
		return rv, nil
	}
	if se, ok := e.(*metricsql.StringExpr); ok {
		rv := evalString(ec, se.S)
		return rv, nil
	}
	if de, ok := e.(*metricsql.DurationExpr); ok {
		d := de.Duration(ec.Step)
		dSec := float64(d) / 1000
		rv := evalNumber(ec, dSec)
		return rv, nil
	}
	return nil, fmt.Errorf("unexpected expression %q", e.AppendString(nil))
}

func evalTransformFunc(qt *querytracer.Tracer, ec *EvalConfig, fe *metricsql.FuncExpr) ([]*timeseries, error) {
	tf := getTransformFunc(fe.Name)
	if tf == nil {
		return nil, &httpserver.UserReadableError{
			Err: fmt.Errorf(`unknown func %q`, fe.Name),
		}
	}
	var args [][]*timeseries
	var err error
	switch fe.Name {
	case "", "union":
		args, err = evalExprsInParallel(qt, ec, fe.Args)
	default:
		args, err = evalExprsSequentially(qt, ec, fe.Args)
	}
	if err != nil {
		return nil, err
	}
	tfa := &transformFuncArg{
		ec:   ec,
		fe:   fe,
		args: args,
	}
	rv, err := tf(tfa)
	if err != nil {
		return nil, &httpserver.UserReadableError{
			Err: fmt.Errorf(`cannot evaluate %q: %w`, fe.AppendString(nil), err),
		}
	}
	return rv, nil
}

func evalAggrFunc(qt *querytracer.Tracer, ec *EvalConfig, ae *metricsql.AggrFuncExpr) ([]*timeseries, error) {
	if callbacks := getIncrementalAggrFuncCallbacks(ae.Name); callbacks != nil {
		fe, nrf := tryGetArgRollupFuncWithMetricExpr(ae)
		if fe != nil {
			// There is an optimized path for calculating metricsql.AggrFuncExpr over rollupFunc over metricsql.MetricExpr.
			// The optimized path saves RAM for aggregates over big number of time series.
			args, re, err := evalRollupFuncArgs(qt, ec, fe)
			if err != nil {
				return nil, err
			}
			rf, err := nrf(args)
			if err != nil {
				return nil, fmt.Errorf("cannot evaluate args for aggregate func %q: %w", ae.AppendString(nil), err)
			}
			iafc := newIncrementalAggrFuncContext(ae, callbacks)
			return evalRollupFunc(qt, ec, fe.Name, rf, ae, re, iafc)
		}
	}
	args, err := evalExprsInParallel(qt, ec, ae.Args)
	if err != nil {
		return nil, err
	}
	af := getAggrFunc(ae.Name)
	if af == nil {
		return nil, &httpserver.UserReadableError{
			Err: fmt.Errorf(`unknown func %q`, ae.Name),
		}
	}
	afa := &aggrFuncArg{
		ae:   ae,
		args: args,
		ec:   ec,
	}
	qtChild := qt.NewChild("eval %s", ae.Name)
	rv, err := af(afa)
	qtChild.Done()
	if err != nil {
		return nil, fmt.Errorf(`cannot evaluate %q: %w`, ae.AppendString(nil), err)
	}
	return rv, nil
}

func evalBinaryOp(qt *querytracer.Tracer, ec *EvalConfig, be *metricsql.BinaryOpExpr) ([]*timeseries, error) {
	bf := getBinaryOpFunc(be.Op)
	if bf == nil {
		return nil, fmt.Errorf(`unknown binary op %q`, be.Op)
	}
	var err error
	var tssLeft, tssRight []*timeseries
	switch strings.ToLower(be.Op) {
	case "and", "if":
		// Fetch right-side series at first, since it usually contains
		// lower number of time series for `and` and `if` operator.
		// This should produce more specific label filters for the left side of the query.
		// This, in turn, should reduce the time to select series for the left side of the query.
		tssRight, tssLeft, err = execBinaryOpArgs(qt, ec, be.Right, be.Left, be)
	default:
		tssLeft, tssRight, err = execBinaryOpArgs(qt, ec, be.Left, be.Right, be)
	}
	if err != nil {
		return nil, fmt.Errorf("cannot execute %q: %w", be.AppendString(nil), err)
	}
	bfa := &binaryOpFuncArg{
		be:    be,
		left:  tssLeft,
		right: tssRight,
	}
	rv, err := bf(bfa)
	if err != nil {
		return nil, fmt.Errorf(`cannot evaluate %q: %w`, be.AppendString(nil), err)
	}
	return rv, nil
}

func canPushdownCommonFilters(be *metricsql.BinaryOpExpr) bool {
	switch strings.ToLower(be.Op) {
	case "or", "default":
		return false
	}
	if isAggrFuncWithoutGrouping(be.Left) || isAggrFuncWithoutGrouping(be.Right) {
		return false
	}
	return true
}

func isAggrFuncWithoutGrouping(e metricsql.Expr) bool {
	afe, ok := e.(*metricsql.AggrFuncExpr)
	if !ok {
		return false
	}
	return len(afe.Modifier.Args) == 0
}

func execBinaryOpArgs(qt *querytracer.Tracer, ec *EvalConfig, exprFirst, exprSecond metricsql.Expr, be *metricsql.BinaryOpExpr) ([]*timeseries, []*timeseries, error) {
	if !canPushdownCommonFilters(be) {
		// Execute exprFirst and exprSecond in parallel, since it is impossible to pushdown common filters
		// from exprFirst to exprSecond.
		// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2886
		qt = qt.NewChild("execute left and right sides of %q in parallel", be.Op)
		defer qt.Done()
		var wg sync.WaitGroup

		var tssFirst []*timeseries
		var errFirst error
		qtFirst := qt.NewChild("expr1")
		wg.Add(1)
		go func() {
			defer wg.Done()
			tssFirst, errFirst = evalExpr(qtFirst, ec, exprFirst)
			qtFirst.Done()
		}()

		var tssSecond []*timeseries
		var errSecond error
		qtSecond := qt.NewChild("expr2")
		wg.Add(1)
		go func() {
			defer wg.Done()
			tssSecond, errSecond = evalExpr(qtSecond, ec, exprSecond)
			qtSecond.Done()
		}()

		wg.Wait()
		if errFirst != nil {
			return nil, nil, errFirst
		}
		if errSecond != nil {
			return nil, nil, errSecond
		}
		return tssFirst, tssSecond, nil
	}

	// Execute binary operation in the following way:
	//
	// 1) execute the exprFirst
	// 2) get common label filters for series returned at step 1
	// 3) push down the found common label filters to exprSecond. This filters out unneeded series
	//    during exprSecond execution instead of spending compute resources on extracting and processing these series
	//    before they are dropped later when matching time series according to https://prometheus.io/docs/prometheus/latest/querying/operators/#vector-matching
	// 4) execute the exprSecond with possible additional filters found at step 3
	//
	// Typical use cases:
	// - Kubernetes-related: show pod creation time with the node name:
	//
	//     kube_pod_created{namespace="prod"} * on (uid) group_left(node) kube_pod_info
	//
	//   Without the optimization `kube_pod_info` would select and spend compute resources
	//   for more time series than needed. The selected time series would be dropped later
	//   when matching time series on the right and left sides of binary operand.
	//
	// - Generic alerting queries, which rely on `info` metrics.
	//   See https://grafana.com/blog/2021/08/04/how-to-use-promql-joins-for-more-effective-queries-of-prometheus-metrics-at-scale/
	//
	// - Queries, which get additional labels from `info` metrics.
	//   See https://www.robustperception.io/exposing-the-software-version-to-prometheus
	tssFirst, err := evalExpr(qt, ec, exprFirst)
	if err != nil {
		return nil, nil, err
	}
	if len(tssFirst) == 0 && !strings.EqualFold(be.Op, "or") {
		// Fast path: there is no sense in executing the exprSecond when exprFirst returns an empty result,
		// since the "exprFirst op exprSecond" would return an empty result in any case.
		// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3349
		return nil, nil, nil
	}
	lfs := getCommonLabelFilters(tssFirst)
	lfs = metricsql.TrimFiltersByGroupModifier(lfs, be)
	exprSecond = metricsql.PushdownBinaryOpFilters(exprSecond, lfs)
	tssSecond, err := evalExpr(qt, ec, exprSecond)
	if err != nil {
		return nil, nil, err
	}
	return tssFirst, tssSecond, nil
}

func getCommonLabelFilters(tss []*timeseries) []metricsql.LabelFilter {
	if len(tss) == 0 {
		return nil
	}
	type valuesCounter struct {
		values map[string]struct{}
		count  int
	}
	m := make(map[string]*valuesCounter, len(tss[0].MetricName.Tags))
	for _, ts := range tss {
		for _, tag := range ts.MetricName.Tags {
			vc, ok := m[string(tag.Key)]
			if !ok {
				k := string(tag.Key)
				v := string(tag.Value)
				m[k] = &valuesCounter{
					values: map[string]struct{}{
						v: {},
					},
					count: 1,
				}
				continue
			}
			if len(vc.values) > *maxBinaryOpPushdownLabelValues {
				// Too many unique values found for the given tag.
				// Do not make a filter on such values, since it may slow down
				// search for matching time series.
				continue
			}
			vc.count++
			if _, ok := vc.values[string(tag.Value)]; !ok {
				vc.values[string(tag.Value)] = struct{}{}
			}
		}
	}
	lfs := make([]metricsql.LabelFilter, 0, len(m))
	var values []string
	for k, vc := range m {
		if vc.count != len(tss) {
			// Skip the tag, since it doesn't belong to all the time series.
			continue
		}
		values = values[:0]
		for s := range vc.values {
			values = append(values, s)
		}
		lf := metricsql.LabelFilter{
			Label: k,
		}
		if len(values) == 1 {
			lf.Value = values[0]
		} else {
			sort.Strings(values)
			lf.Value = joinRegexpValues(values)
			lf.IsRegexp = true
		}
		lfs = append(lfs, lf)
	}
	sort.Slice(lfs, func(i, j int) bool {
		return lfs[i].Label < lfs[j].Label
	})
	return lfs
}

func joinRegexpValues(a []string) string {
	var b []byte
	for i, s := range a {
		sQuoted := regexp.QuoteMeta(s)
		b = append(b, sQuoted...)
		if i < len(a)-1 {
			b = append(b, '|')
		}
	}
	return string(b)
}

func tryGetArgRollupFuncWithMetricExpr(ae *metricsql.AggrFuncExpr) (*metricsql.FuncExpr, newRollupFunc) {
	if len(ae.Args) != 1 {
		return nil, nil
	}
	e := ae.Args[0]
	// Make sure e contains one of the following:
	// - metricExpr
	// - metricExpr[d]
	// - rollupFunc(metricExpr)
	// - rollupFunc(metricExpr[d])

	if me, ok := e.(*metricsql.MetricExpr); ok {
		// e = metricExpr
		if me.IsEmpty() {
			return nil, nil
		}
		fe := &metricsql.FuncExpr{
			Name: "default_rollup",
			Args: []metricsql.Expr{me},
		}
		nrf := getRollupFunc(fe.Name)
		return fe, nrf
	}
	if re, ok := e.(*metricsql.RollupExpr); ok {
		if me, ok := re.Expr.(*metricsql.MetricExpr); !ok || me.IsEmpty() || re.ForSubquery() {
			return nil, nil
		}
		// e = metricExpr[d]
		fe := &metricsql.FuncExpr{
			Name: "default_rollup",
			Args: []metricsql.Expr{re},
		}
		nrf := getRollupFunc(fe.Name)
		return fe, nrf
	}
	fe, ok := e.(*metricsql.FuncExpr)
	if !ok {
		return nil, nil
	}
	nrf := getRollupFunc(fe.Name)
	if nrf == nil {
		return nil, nil
	}
	rollupArgIdx := metricsql.GetRollupArgIdx(fe)
	if rollupArgIdx >= len(fe.Args) {
		// Incorrect number of args for rollup func.
		return nil, nil
	}
	arg := fe.Args[rollupArgIdx]
	if me, ok := arg.(*metricsql.MetricExpr); ok {
		if me.IsEmpty() {
			return nil, nil
		}
		// e = rollupFunc(metricExpr)
		return fe, nrf
	}
	if re, ok := arg.(*metricsql.RollupExpr); ok {
		if me, ok := re.Expr.(*metricsql.MetricExpr); !ok || me.IsEmpty() || re.ForSubquery() {
			return nil, nil
		}
		// e = rollupFunc(metricExpr[d])
		return fe, nrf
	}
	return nil, nil
}

func evalExprsSequentially(qt *querytracer.Tracer, ec *EvalConfig, es []metricsql.Expr) ([][]*timeseries, error) {
	var rvs [][]*timeseries
	for _, e := range es {
		rv, err := evalExpr(qt, ec, e)
		if err != nil {
			return nil, err
		}
		rvs = append(rvs, rv)
	}
	return rvs, nil
}

func evalExprsInParallel(qt *querytracer.Tracer, ec *EvalConfig, es []metricsql.Expr) ([][]*timeseries, error) {
	if len(es) < 2 {
		return evalExprsSequentially(qt, ec, es)
	}
	rvs := make([][]*timeseries, len(es))
	errs := make([]error, len(es))
	qt.Printf("eval function args in parallel")
	var wg sync.WaitGroup
	for i, e := range es {
		wg.Add(1)
		qtChild := qt.NewChild("eval arg %d", i)
		go func(e metricsql.Expr, i int) {
			defer func() {
				qtChild.Done()
				wg.Done()
			}()
			rv, err := evalExpr(qtChild, ec, e)
			rvs[i] = rv
			errs[i] = err
		}(e, i)
	}
	wg.Wait()
	for _, err := range errs {
		if err != nil {
			return nil, err
		}
	}
	return rvs, nil
}

func evalRollupFuncArgs(qt *querytracer.Tracer, ec *EvalConfig, fe *metricsql.FuncExpr) ([]any, *metricsql.RollupExpr, error) {
	var re *metricsql.RollupExpr
	rollupArgIdx := metricsql.GetRollupArgIdx(fe)
	if len(fe.Args) <= rollupArgIdx {
		return nil, nil, fmt.Errorf("expecting at least %d args to %q; got %d args; expr: %q", rollupArgIdx+1, fe.Name, len(fe.Args), fe.AppendString(nil))
	}
	args := make([]any, len(fe.Args))
	for i, arg := range fe.Args {
		if i == rollupArgIdx {
			re = getRollupExprArg(arg)
			args[i] = re
			continue
		}
		ts, err := evalExpr(qt, ec, arg)
		if err != nil {
			return nil, nil, fmt.Errorf("cannot evaluate arg #%d for %q: %w", i+1, fe.AppendString(nil), err)
		}
		args[i] = ts
	}
	return args, re, nil
}

func getRollupExprArg(arg metricsql.Expr) *metricsql.RollupExpr {
	re, ok := arg.(*metricsql.RollupExpr)
	if !ok {
		// Wrap non-rollup arg into metricsql.RollupExpr.
		return &metricsql.RollupExpr{
			Expr: arg,
		}
	}
	if !re.ForSubquery() {
		// Return standard rollup if it doesn't contain subquery.
		return re
	}
	me, ok := re.Expr.(*metricsql.MetricExpr)
	if !ok {
		// arg contains subquery.
		return re
	}
	// Convert me[w:step] -> default_rollup(me)[w:step]
	reNew := *re
	reNew.Expr = &metricsql.FuncExpr{
		Name: "default_rollup",
		Args: []metricsql.Expr{
			&metricsql.RollupExpr{Expr: me},
		},
	}
	return &reNew
}

// expr may contain:
// - rollupFunc(m) if iafc is nil
// - aggrFunc(rollupFunc(m)) if iafc isn't nil
func evalRollupFunc(qt *querytracer.Tracer, ec *EvalConfig, funcName string, rf rollupFunc, expr metricsql.Expr,
	re *metricsql.RollupExpr, iafc *incrementalAggrFuncContext) ([]*timeseries, error) {
	if re.At == nil {
		return evalRollupFuncWithoutAt(qt, ec, funcName, rf, expr, re, iafc)
	}
	tssAt, err := evalExpr(qt, ec, re.At)
	if err != nil {
		return nil, &httpserver.UserReadableError{
			Err: fmt.Errorf("cannot evaluate `@` modifier: %w", err),
		}
	}
	if len(tssAt) != 1 {
		return nil, &httpserver.UserReadableError{
			Err: fmt.Errorf("`@` modifier must return a single series; it returns %d series instead", len(tssAt)),
		}
	}
	atValue := math.NaN()
	for _, v := range tssAt[0].Values {
		if !math.IsNaN(v) {
			atValue = v
			break
		}
	}
	if math.IsNaN(atValue) {
		return nil, &httpserver.UserReadableError{
			Err: fmt.Errorf("`@` modifier must return a non-NaN value"),
		}
	}
	atTimestamp := int64(atValue * 1000)
	ecNew := copyEvalConfig(ec)
	ecNew.Start = atTimestamp
	ecNew.End = atTimestamp
	tss, err := evalRollupFuncWithoutAt(qt, ecNew, funcName, rf, expr, re, iafc)
	if err != nil {
		return nil, err
	}
	// expand single-point tss to the original time range.
	timestamps := ec.getSharedTimestamps()
	for _, ts := range tss {
		v := ts.Values[0]
		values := make([]float64, len(timestamps))
		for i := range timestamps {
			values[i] = v
		}
		ts.Timestamps = timestamps
		ts.Values = values
	}
	return tss, nil
}

func evalRollupFuncWithoutAt(qt *querytracer.Tracer, ec *EvalConfig, funcName string, rf rollupFunc,
	expr metricsql.Expr, re *metricsql.RollupExpr, iafc *incrementalAggrFuncContext) ([]*timeseries, error) {
	funcName = strings.ToLower(funcName)
	ecNew := ec
	var offset int64
	if re.Offset != nil {
		offset = re.Offset.Duration(ec.Step)
		ecNew = copyEvalConfig(ecNew)
		ecNew.Start -= offset
		ecNew.End -= offset
		// There is no need in calling AdjustStartEnd() on ecNew if ecNew.MayCache is set to true,
		// since the time range alignment has been already performed by the caller,
		// so cache hit rate should be quite good.
		// See also https://github.com/VictoriaMetrics/VictoriaMetrics/issues/976
	}
	if funcName == "rollup_candlestick" {
		// Automatically apply `offset -step` to `rollup_candlestick` function
		// in order to obtain expected OHLC results.
		// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/309#issuecomment-582113462
		step := ecNew.Step
		ecNew = copyEvalConfig(ecNew)
		ecNew.Start += step
		ecNew.End += step
		offset -= step
	}
	var rvs []*timeseries
	var err error
	if me, ok := re.Expr.(*metricsql.MetricExpr); ok {
		rvs, err = evalRollupFuncWithMetricExpr(qt, ecNew, funcName, rf, expr, me, iafc, re.Window)
	} else {
		if iafc != nil {
			logger.Panicf("BUG: iafc must be nil for rollup %q over subquery %q", funcName, re.AppendString(nil))
		}
		rvs, err = evalRollupFuncWithSubquery(qt, ecNew, funcName, rf, expr, re)
	}
	if err != nil {
		return nil, &httpserver.UserReadableError{
			Err: err,
		}
	}
	if funcName == "absent_over_time" {
		rvs = aggregateAbsentOverTime(ecNew, re.Expr, rvs)
	}
	if offset != 0 && len(rvs) > 0 {
		// Make a copy of timestamps, since they may be used in other values.
		srcTimestamps := rvs[0].Timestamps
		dstTimestamps := append([]int64{}, srcTimestamps...)
		for i := range dstTimestamps {
			dstTimestamps[i] += offset
		}
		for _, ts := range rvs {
			ts.Timestamps = dstTimestamps
		}
	}
	return rvs, nil
}

// aggregateAbsentOverTime collapses tss to a single time series with 1 and nan values.
//
// Values for returned series are set to nan if at least a single tss series contains nan at that point.
// This means that tss contains a series with non-empty results at that point.
// This follows Prometheus logic - see https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2130
func aggregateAbsentOverTime(ec *EvalConfig, expr metricsql.Expr, tss []*timeseries) []*timeseries {
	rvs := getAbsentTimeseries(ec, expr)
	if len(tss) == 0 {
		return rvs
	}
	for i := range tss[0].Values {
		for _, ts := range tss {
			if math.IsNaN(ts.Values[i]) {
				rvs[0].Values[i] = nan
				break
			}
		}
	}
	return rvs
}

func evalRollupFuncWithSubquery(qt *querytracer.Tracer, ec *EvalConfig, funcName string, rf rollupFunc, expr metricsql.Expr, re *metricsql.RollupExpr) ([]*timeseries, error) {
	// TODO: determine whether to use rollupResultCacheV here.
	qt = qt.NewChild("subquery")
	defer qt.Done()
	step, err := re.Step.NonNegativeDuration(ec.Step)
	if err != nil {
		return nil, fmt.Errorf("cannot parse step in square brackets at %s: %w", expr.AppendString(nil), err)
	}
	if step == 0 {
		step = ec.Step
	}
	window, err := re.Window.NonNegativeDuration(ec.Step)
	if err != nil {
		return nil, fmt.Errorf("cannot parse lookbehind window in square brackets at %s: %w", expr.AppendString(nil), err)
	}

	ecSQ := copyEvalConfig(ec)
	ecSQ.Start -= window + step + maxSilenceInterval()
	ecSQ.End += step
	ecSQ.Step = step
	ecSQ.MaxPointsPerSeries = *maxPointsSubqueryPerTimeseries
	if err := ValidateMaxPointsPerSeries(ecSQ.Start, ecSQ.End, ecSQ.Step, ecSQ.MaxPointsPerSeries); err != nil {
		return nil, fmt.Errorf("%w; (see -search.maxPointsSubqueryPerTimeseries command-line flag)", err)
	}
	// unconditionally align start and end args to step for subquery as Prometheus does.
	ecSQ.Start, ecSQ.End = alignStartEnd(ecSQ.Start, ecSQ.End, ecSQ.Step)
	tssSQ, err := evalExpr(qt, ecSQ, re.Expr)
	if err != nil {
		return nil, err
	}
	if len(tssSQ) == 0 {
		return nil, nil
	}
	sharedTimestamps := getTimestamps(ec.Start, ec.End, ec.Step, ec.MaxPointsPerSeries)
	preFunc, rcs, err := getRollupConfigs(funcName, rf, expr, ec.Start, ec.End, ec.Step, ec.MaxPointsPerSeries, window, ec.LookbackDelta, sharedTimestamps)
	if err != nil {
		return nil, err
	}

	var samplesScannedTotal atomic.Uint64
	keepMetricNames := getKeepMetricNames(expr)
	tsw := getTimeseriesByWorkerID()
	seriesByWorkerID := tsw.byWorkerID
	doParallel(tssSQ, func(tsSQ *timeseries, values []float64, timestamps []int64, workerID uint) ([]float64, []int64) {
		values, timestamps = removeNanValues(values[:0], timestamps[:0], tsSQ.Values, tsSQ.Timestamps)
		preFunc(values, timestamps)
		for _, rc := range rcs {
			if tsm := newTimeseriesMap(funcName, keepMetricNames, sharedTimestamps, &tsSQ.MetricName); tsm != nil {
				samplesScanned := rc.DoTimeseriesMap(tsm, values, timestamps)
				samplesScannedTotal.Add(samplesScanned)
				seriesByWorkerID[workerID].tss = tsm.AppendTimeseriesTo(seriesByWorkerID[workerID].tss)
				continue
			}
			var ts timeseries
			samplesScanned := doRollupForTimeseries(funcName, keepMetricNames, rc, &ts, &tsSQ.MetricName, values, timestamps, sharedTimestamps)
			samplesScannedTotal.Add(samplesScanned)
			seriesByWorkerID[workerID].tss = append(seriesByWorkerID[workerID].tss, &ts)
		}
		return values, timestamps
	})
	tss := make([]*timeseries, 0, len(tssSQ)*len(rcs))
	for i := range seriesByWorkerID {
		tss = append(tss, seriesByWorkerID[i].tss...)
	}
	putTimeseriesByWorkerID(tsw)

	rowsScannedPerQuery.Update(float64(samplesScannedTotal.Load()))
	qt.Printf("rollup %s() over %d series returned by subquery: series=%d, samplesScanned=%d", funcName, len(tssSQ), len(tss), samplesScannedTotal.Load())
	return tss, nil
}

var rowsScannedPerQuery = metrics.NewHistogram(`vm_rows_scanned_per_query`)

func getKeepMetricNames(expr metricsql.Expr) bool {
	if ae, ok := expr.(*metricsql.AggrFuncExpr); ok {
		// Extract rollupFunc(...) from aggrFunc(rollupFunc(...)).
		// This case is possible when optimized aggrFunc calculations are used
		// such as `sum(rate(...))`
		if len(ae.Args) != 1 {
			return false
		}
		expr = ae.Args[0]
	}
	if fe, ok := expr.(*metricsql.FuncExpr); ok {
		return fe.KeepMetricNames
	}
	return false
}

func doParallel(tss []*timeseries, f func(ts *timeseries, values []float64, timestamps []int64, workerID uint) ([]float64, []int64)) {
	workers := netstorage.MaxWorkers()
	if workers > len(tss) {
		workers = len(tss)
	}
	seriesPerWorker := (len(tss) + workers - 1) / workers
	workChs := make([]chan *timeseries, workers)
	for i := range workChs {
		workChs[i] = make(chan *timeseries, seriesPerWorker)
	}
	for i, ts := range tss {
		idx := i % len(workChs)
		workChs[idx] <- ts
	}
	for _, workCh := range workChs {
		close(workCh)
	}

	var wg sync.WaitGroup
	wg.Add(workers)
	for i := 0; i < workers; i++ {
		go func(workerID uint) {
			defer wg.Done()
			var tmpValues []float64
			var tmpTimestamps []int64
			for ts := range workChs[workerID] {
				tmpValues, tmpTimestamps = f(ts, tmpValues, tmpTimestamps, workerID)
			}
		}(uint(i))
	}
	wg.Wait()
}

func removeNanValues(dstValues []float64, dstTimestamps []int64, values []float64, timestamps []int64) ([]float64, []int64) {
	hasNan := false
	for _, v := range values {
		if math.IsNaN(v) {
			hasNan = true
		}
	}
	if !hasNan {
		// Fast path - no NaNs.
		dstValues = append(dstValues, values...)
		dstTimestamps = append(dstTimestamps, timestamps...)
		return dstValues, dstTimestamps
	}

	// Slow path - remove NaNs.
	for i, v := range values {
		if math.IsNaN(v) {
			continue
		}
		dstValues = append(dstValues, v)
		dstTimestamps = append(dstTimestamps, timestamps[i])
	}
	return dstValues, dstTimestamps
}

// evalInstantRollup evaluates instant rollup where ec.Start == ec.End.
func evalInstantRollup(qt *querytracer.Tracer, ec *EvalConfig, funcName string, rf rollupFunc,
	expr metricsql.Expr, me *metricsql.MetricExpr, iafc *incrementalAggrFuncContext, window int64) ([]*timeseries, error) {
	if ec.Start != ec.End {
		logger.Panicf("BUG: evalInstantRollup cannot be called on non-empty time range; got %s", ec.timeRangeString())
	}
	timestamp := ec.Start
	if qt.Enabled() {
		qt = qt.NewChild("instant rollup %s; time=%s, window=%d", expr.AppendString(nil), storage.TimestampToHumanReadableFormat(timestamp), window)
		defer qt.Done()
	}

	evalAt := func(qt *querytracer.Tracer, timestamp, window int64) ([]*timeseries, error) {
		ecCopy := copyEvalConfig(ec)
		ecCopy.Start = timestamp
		ecCopy.End = timestamp
		pointsPerSeries := int64(1)
		return evalRollupFuncNoCache(qt, ecCopy, funcName, rf, expr, me, iafc, window, pointsPerSeries)
	}
	tooBigOffset := func(offset int64) bool {
		maxOffset := window / 2
		if maxOffset > 1800*1000 {
			maxOffset = 1800 * 1000
		}
		return offset >= maxOffset
	}
	deleteCachedSeries := func(qt *querytracer.Tracer) {
		rollupResultCacheV.DeleteInstantValues(qt, expr, window, ec.Step, ec.EnforcedTagFilterss)
	}
	getCachedSeries := func(qt *querytracer.Tracer) ([]*timeseries, int64, error) {
	again:
		offset := int64(0)
		tssCached := rollupResultCacheV.GetInstantValues(qt, expr, window, ec.Step, ec.EnforcedTagFilterss)
		if len(tssCached) == 0 {
			// Cache miss. Re-populate the missing data.
			start := int64(fasttime.UnixTimestamp()*1000) - cacheTimestampOffset.Milliseconds()
			offset = timestamp - start
			if offset < 0 {
				start = timestamp
				offset = 0
			}
			if tooBigOffset(offset) {
				qt.Printf("cannot apply instant rollup optimization because the -search.cacheTimestampOffset=%s is too big "+
					"for the requested time=%s and window=%d", cacheTimestampOffset, storage.TimestampToHumanReadableFormat(timestamp), window)
				tss, err := evalAt(qt, timestamp, window)
				return tss, 0, err
			}
			qt.Printf("calculating the rollup at time=%s, because it is missing in the cache", storage.TimestampToHumanReadableFormat(start))
			tss, err := evalAt(qt, start, window)
			if err != nil {
				return nil, 0, err
			}
			if hasDuplicateSeries(tss) {
				qt.Printf("cannot apply instant rollup optimization because the result contains duplicate series")
				tss, err := evalAt(qt, timestamp, window)
				return tss, 0, err
			}
			rollupResultCacheV.PutInstantValues(qt, expr, window, ec.Step, ec.EnforcedTagFilterss, tss)
			return tss, offset, nil
		}
		// Cache hit. Verify whether it is OK to use the cached data.
		offset = timestamp - tssCached[0].Timestamps[0]
		if offset < 0 {
			qt.Printf("do not apply instant rollup optimization because the cached values have bigger timestamp=%s than the requested one=%s",
				storage.TimestampToHumanReadableFormat(tssCached[0].Timestamps[0]), storage.TimestampToHumanReadableFormat(timestamp))
			// Delete the outdated cached values, so the cache could be re-populated with newer values.
			deleteCachedSeries(qt)
			goto again
		}
		if tooBigOffset(offset) {
			qt.Printf("do not apply instant rollup optimization because the offset=%d between the requested timestamp "+
				"and the cached values is too big comparing to window=%d", offset, window)
			// Delete the outdated cached values, so the cache could be re-populated with newer values.
			deleteCachedSeries(qt)
			goto again
		}
		ec.QueryStats.addSeriesFetched(len(tssCached))
		return tssCached, offset, nil
	}

	if !ec.mayCache() {
		qt.Printf("do not apply instant rollup optimization because of disabled cache")
		return evalAt(qt, timestamp, window)
	}
	if window < minWindowForInstantRollupOptimization.Milliseconds() {
		qt.Printf("do not apply instant rollup optimization because of too small window=%d; must be equal or bigger than %d",
			window, minWindowForInstantRollupOptimization.Milliseconds())
		return evalAt(qt, timestamp, window)
	}
	switch funcName {
	case "avg_over_time":
		if iafc != nil {
			qt.Printf("do not apply instant rollup optimization for incremental aggregate %s()", iafc.ae.Name)
			return evalAt(qt, timestamp, window)
		}
		qt.Printf("optimized calculation for instant rollup avg_over_time(m[d]) as (sum_over_time(m[d]) / count_over_time(m[d]))")
		fe := expr.(*metricsql.FuncExpr)
		feSum := *fe
		feSum.Name = "sum_over_time"
		feCount := *fe
		feCount.Name = "count_over_time"
		be := &metricsql.BinaryOpExpr{
			Op:              "/",
			KeepMetricNames: fe.KeepMetricNames,
			Left:            &feSum,
			Right:           &feCount,
		}
		return evalExpr(qt, ec, be)
	case "rate":
		if iafc != nil {
			if !strings.EqualFold(iafc.ae.Name, "sum") {
				qt.Printf("do not apply instant rollup optimization for incremental aggregate %s()", iafc.ae.Name)
				return evalAt(qt, timestamp, window)
			}
			qt.Printf("optimized calculation for sum(rate(m[d])) as (sum(increase(m[d])) / d)")
			afe := expr.(*metricsql.AggrFuncExpr)
			fe := afe.Args[0].(*metricsql.FuncExpr)
			feIncrease := *fe
			feIncrease.Name = "increase"
			re := fe.Args[0].(*metricsql.RollupExpr)
			d := re.Window.Duration(ec.Step)
			if d == 0 {
				d = ec.Step
			}
			afeIncrease := *afe
			afeIncrease.Args = []metricsql.Expr{&feIncrease}
			be := &metricsql.BinaryOpExpr{
				Op:              "/",
				KeepMetricNames: true,
				Left:            &afeIncrease,
				Right: &metricsql.NumberExpr{
					N: float64(d) / 1000,
				},
			}
			return evalExpr(qt, ec, be)
		}
		qt.Printf("optimized calculation for instant rollup rate(m[d]) as (increase(m[d]) / d)")
		fe := expr.(*metricsql.FuncExpr)
		feIncrease := *fe
		feIncrease.Name = "increase"
		re := fe.Args[0].(*metricsql.RollupExpr)
		d := re.Window.Duration(ec.Step)
		if d == 0 {
			d = ec.Step
		}
		be := &metricsql.BinaryOpExpr{
			Op:              "/",
			KeepMetricNames: fe.KeepMetricNames,
			Left:            &feIncrease,
			Right: &metricsql.NumberExpr{
				N: float64(d) / 1000,
			},
		}
		return evalExpr(qt, ec, be)
	case "max_over_time":
		if iafc != nil {
			if !strings.EqualFold(iafc.ae.Name, "max") {
				qt.Printf("do not apply instant rollup optimization for non-max incremental aggregate %s()", iafc.ae.Name)
				return evalAt(qt, timestamp, window)
			}
		}

		// Calculate
		//
		// max_over_time(m[window] @ timestamp)
		//
		// as the maximum of
		//
		// - max_over_time(m[window] @ (timestamp-offset))
		// - max_over_time(m[offset] @ timestamp)
		//
		// if max_over_time(m[offset] @ (timestamp-window)) < max_over_time(m[window] @ (timestamp-offset))
		// otherwise do not apply the optimization
		//
		// where
		//
		// - max_over_time(m[window] @ (timestamp-offset)) is obtained from cache
		// - max_over_time(m[offset] @ timestamp) and max_over_time(m[offset] @ (timestamp-window)) are calculated from the storage
		//   These rollups are calculated faster than max_over_time(m[window]) because offset is smaller than window.
		qtChild := qt.NewChild("optimized calculation for instant rollup %s at time=%s with lookbehind window=%d",
			expr.AppendString(nil), storage.TimestampToHumanReadableFormat(timestamp), window)
		defer qtChild.Done()

		tssCached, offset, err := getCachedSeries(qtChild)
		if err != nil {
			return nil, err
		}
		if offset == 0 {
			return tssCached, nil
		}
		// Calculate max_over_time(m[offset] @ timestamp)
		tssStart, err := evalAt(qtChild, timestamp, offset)
		if err != nil {
			return nil, err
		}
		if hasDuplicateSeries(tssStart) {
			qtChild.Printf("cannot apply instant rollup optimization, since tssStart contains duplicate series")
			return evalAt(qtChild, timestamp, window)
		}
		// Calculate max_over_time(m[offset] @ (timestamp - window))
		tssEnd, err := evalAt(qtChild, timestamp-window, offset)
		if err != nil {
			return nil, err
		}
		if hasDuplicateSeries(tssEnd) {
			qtChild.Printf("cannot apply instant rollup optimization, since tssEnd contains duplicate series")
			return evalAt(qtChild, timestamp, window)
		}
		// Calculate the result
		tss, ok := getMaxInstantValues(qtChild, tssCached, tssStart, tssEnd, timestamp)
		if !ok {
			qtChild.Printf("cannot apply instant rollup optimization, since tssEnd contains bigger values than tssCached")
			deleteCachedSeries(qtChild)
			return evalAt(qt, timestamp, window)
		}
		return tss, nil
	case "min_over_time":
		if iafc != nil {
			if !strings.EqualFold(iafc.ae.Name, "min") {
				qt.Printf("do not apply instant rollup optimization for non-min incremental aggregate %s()", iafc.ae.Name)
				return evalAt(qt, timestamp, window)
			}
		}

		// Calculate
		//
		//   min_over_time(m[window] @ timestamp)
		//
		// as the minimum of
		//
		//   - min_over_time(m[window] @ (timestamp-offset))
		//   - min_over_time(m[offset] @ timestamp)
		//
		// if min_over_time(m[offset] @ (timestamp-window)) > min_over_time(m[window] @ (timestamp-offset))
		// otherwise do not apply the optimization
		//
		// where
		//
		// - min_over_time(m[window] @ (timestamp-offset)) is obtained from cache
		// - min_over_time(m[offset] @ timestamp) and min_over_time(m[offset] @ (timestamp-window)) are calculated from the storage
		//   These rollups are calculated faster than min_over_time(m[window]) because offset is smaller than window.
		qtChild := qt.NewChild("optimized calculation for instant rollup %s at time=%s with lookbehind window=%d",
			expr.AppendString(nil), storage.TimestampToHumanReadableFormat(timestamp), window)
		defer qtChild.Done()

		tssCached, offset, err := getCachedSeries(qtChild)
		if err != nil {
			return nil, err
		}
		if offset == 0 {
			return tssCached, nil
		}
		// Calculate min_over_time(m[offset] @ timestamp)
		tssStart, err := evalAt(qtChild, timestamp, offset)
		if err != nil {
			return nil, err
		}
		if hasDuplicateSeries(tssStart) {
			qtChild.Printf("cannot apply instant rollup optimization, since tssStart contains duplicate series")
			return evalAt(qtChild, timestamp, window)
		}
		// Calculate min_over_time(m[offset] @ (timestamp - window))
		tssEnd, err := evalAt(qtChild, timestamp-window, offset)
		if err != nil {
			return nil, err
		}
		if hasDuplicateSeries(tssEnd) {
			qtChild.Printf("cannot apply instant rollup optimization, since tssEnd contains duplicate series")
			return evalAt(qtChild, timestamp, window)
		}
		// Calculate the result
		tss, ok := getMinInstantValues(qtChild, tssCached, tssStart, tssEnd, timestamp)
		if !ok {
			qtChild.Printf("cannot apply instant rollup optimization, since tssEnd contains smaller values than tssCached")
			deleteCachedSeries(qtChild)
			return evalAt(qt, timestamp, window)
		}
		return tss, nil
	case
		"count_eq_over_time",
		"count_gt_over_time",
		"count_le_over_time",
		"count_ne_over_time",
		"count_over_time",
		"increase",
		"increase_pure",
		"sum_over_time":
		if iafc != nil && !strings.EqualFold(iafc.ae.Name, "sum") {
			qt.Printf("do not apply instant rollup optimization for non-sum incremental aggregate %s()", iafc.ae.Name)
			return evalAt(qt, timestamp, window)
		}

		// Calculate
		//
		//   rf(m[window] @ timestamp)
		//
		// as
		//
		//   rf(m[window] @ (timestamp-offset)) + rf(m[offset] @ timestamp) - rf(m[offset] @ (timestamp-window))
		//
		// where
		//
		// - rf is count_over_time, sum_over_time or increase
		// - rf(m[window] @ (timestamp-offset)) is obtained from cache
		// - rf(m[offset] @ timestamp) and rf(m[offset] @ (timestamp-window)) are calculated from the storage
		//   These rollups are calculated faster than rf(m[window]) because offset is smaller than window.
		qtChild := qt.NewChild("optimized calculation for instant rollup %s at time=%s with lookbehind window=%d",
			expr.AppendString(nil), storage.TimestampToHumanReadableFormat(timestamp), window)
		defer qtChild.Done()

		tssCached, offset, err := getCachedSeries(qtChild)
		if err != nil {
			return nil, err
		}
		if offset == 0 {
			return tssCached, nil
		}
		// Calculate rf(m[offset] @ timestamp)
		tssStart, err := evalAt(qtChild, timestamp, offset)
		if err != nil {
			return nil, err
		}
		if hasDuplicateSeries(tssStart) {
			qtChild.Printf("cannot apply instant rollup optimization, since tssStart contains duplicate series")
			return evalAt(qtChild, timestamp, window)
		}
		// Calculate rf(m[offset] @ (timestamp - window))
		tssEnd, err := evalAt(qtChild, timestamp-window, offset)
		if err != nil {
			return nil, err
		}
		if hasDuplicateSeries(tssEnd) {
			qtChild.Printf("cannot apply instant rollup optimization, since tssEnd contains duplicate series")
			return evalAt(qtChild, timestamp, window)
		}
		// Calculate the result
		tss := getSumInstantValues(qtChild, tssCached, tssStart, tssEnd, timestamp)
		return tss, nil
	default:
		qt.Printf("instant rollup optimization isn't implemented for %s()", funcName)
		return evalAt(qt, timestamp, window)
	}
}

func hasDuplicateSeries(tss []*timeseries) bool {
	if len(tss) <= 1 {
		return false
	}

	m := make(map[string]struct{}, len(tss))
	bb := bbPool.Get()
	defer bbPool.Put(bb)

	for _, ts := range tss {
		bb.B = marshalMetricNameSorted(bb.B[:0], &ts.MetricName)
		if _, ok := m[string(bb.B)]; ok {
			return true
		}
		m[string(bb.B)] = struct{}{}
	}
	return false
}

func getMinInstantValues(qt *querytracer.Tracer, tssCached, tssStart, tssEnd []*timeseries, timestamp int64) ([]*timeseries, bool) {
	qt = qt.NewChild("calculate the minimum for instant values across series; cached=%d, start=%d, end=%d, timestamp=%d", len(tssCached), len(tssStart), len(tssEnd), timestamp)
	defer qt.Done()

	getMin := func(a, b float64) float64 {
		if a < b {
			return a
		}
		return b
	}
	tss, ok := getMinMaxInstantValues(tssCached, tssStart, tssEnd, timestamp, getMin)
	qt.Printf("resulting series=%d; ok=%v", len(tss), ok)
	return tss, ok
}

func getMaxInstantValues(qt *querytracer.Tracer, tssCached, tssStart, tssEnd []*timeseries, timestamp int64) ([]*timeseries, bool) {
	qt = qt.NewChild("calculate the maximum for instant values across series; cached=%d, start=%d, end=%d, timestamp=%d", len(tssCached), len(tssStart), len(tssEnd), timestamp)
	defer qt.Done()

	getMax := func(a, b float64) float64 {
		if a > b {
			return a
		}
		return b
	}
	tss, ok := getMinMaxInstantValues(tssCached, tssStart, tssEnd, timestamp, getMax)
	qt.Printf("resulting series=%d", len(tss))
	return tss, ok
}

func getMinMaxInstantValues(tssCached, tssStart, tssEnd []*timeseries, timestamp int64, f func(a, b float64) float64) ([]*timeseries, bool) {
	assertInstantValues(tssCached)
	assertInstantValues(tssStart)
	assertInstantValues(tssEnd)

	bb := bbPool.Get()
	defer bbPool.Put(bb)

	m := make(map[string]*timeseries, len(tssCached))
	for _, ts := range tssCached {
		bb.B = marshalMetricNameSorted(bb.B[:0], &ts.MetricName)
		if _, ok := m[string(bb.B)]; ok {
			logger.Panicf("BUG: duplicate series found: %s", &ts.MetricName)
		}
		m[string(bb.B)] = ts
	}

	mStart := make(map[string]*timeseries, len(tssStart))
	for _, ts := range tssStart {
		bb.B = marshalMetricNameSorted(bb.B[:0], &ts.MetricName)
		if _, ok := mStart[string(bb.B)]; ok {
			logger.Panicf("BUG: duplicate series found: %s", &ts.MetricName)
		}
		mStart[string(bb.B)] = ts
		tsCached := m[string(bb.B)]
		if tsCached != nil && !math.IsNaN(tsCached.Values[0]) {
			if !math.IsNaN(ts.Values[0]) {
				tsCached.Values[0] = f(ts.Values[0], tsCached.Values[0])
			}
		} else {
			m[string(bb.B)] = ts
		}
	}

	for _, ts := range tssEnd {
		bb.B = marshalMetricNameSorted(bb.B[:0], &ts.MetricName)
		tsCached := m[string(bb.B)]
		if tsCached != nil && !math.IsNaN(tsCached.Values[0]) && !math.IsNaN(ts.Values[0]) {
			if ts.Values[0] == f(ts.Values[0], tsCached.Values[0]) {
				tsStart := mStart[string(bb.B)]
				if tsStart == nil || math.IsNaN(tsStart.Values[0]) || tsStart.Values[0] != f(ts.Values[0], tsStart.Values[0]) {
					return nil, false
				}
			}
		}
	}

	rvs := make([]*timeseries, 0, len(m))
	for _, ts := range m {
		rvs = append(rvs, ts)
	}

	setInstantTimestamp(rvs, timestamp)

	return rvs, true
}

// getSumInstantValues aggregates tssCached, tssStart, tssEnd time series
// into a new time series with value = tssCached + tssStart - tssEnd
func getSumInstantValues(qt *querytracer.Tracer, tssCached, tssStart, tssEnd []*timeseries, timestamp int64) []*timeseries {
	qt = qt.NewChild("calculate the sum for instant values across series; cached=%d, start=%d, end=%d, timestamp=%d", len(tssCached), len(tssStart), len(tssEnd), timestamp)
	defer qt.Done()

	assertInstantValues(tssCached)
	assertInstantValues(tssStart)
	assertInstantValues(tssEnd)

	m := make(map[string]*timeseries, len(tssCached))
	bb := bbPool.Get()
	defer bbPool.Put(bb)

	for _, ts := range tssCached {
		bb.B = marshalMetricNameSorted(bb.B[:0], &ts.MetricName)
		if _, ok := m[string(bb.B)]; ok {
			logger.Panicf("BUG: duplicate series found: %s", &ts.MetricName)
		}
		m[string(bb.B)] = ts
	}

	for _, ts := range tssStart {
		bb.B = marshalMetricNameSorted(bb.B[:0], &ts.MetricName)
		tsCached := m[string(bb.B)]
		if tsCached != nil && !math.IsNaN(tsCached.Values[0]) {
			if !math.IsNaN(ts.Values[0]) {
				tsCached.Values[0] += ts.Values[0]
			}
		} else {
			m[string(bb.B)] = ts
		}
	}

	for _, ts := range tssEnd {
		bb.B = marshalMetricNameSorted(bb.B[:0], &ts.MetricName)
		tsCached := m[string(bb.B)]
		if tsCached != nil && !math.IsNaN(tsCached.Values[0]) {
			if !math.IsNaN(ts.Values[0]) {
				tsCached.Values[0] -= ts.Values[0]
			}
		}
	}

	rvs := make([]*timeseries, 0, len(m))
	for _, ts := range m {
		rvs = append(rvs, ts)
	}

	setInstantTimestamp(rvs, timestamp)

	qt.Printf("resulting series=%d", len(rvs))
	return rvs
}

func setInstantTimestamp(tss []*timeseries, timestamp int64) {
	for _, ts := range tss {
		ts.Timestamps[0] = timestamp
	}
}

func assertInstantValues(tss []*timeseries) {
	for _, ts := range tss {
		if len(ts.Values) != 1 {
			logger.Panicf("BUG: instant series must contain a single value; got %d values", len(ts.Values))
		}
		if len(ts.Timestamps) != 1 {
			logger.Panicf("BUG: instant series must contain a single timestamp; got %d timestamps", len(ts.Timestamps))
		}
	}
}

var (
	rollupResultCacheFullHits    = metrics.NewCounter(`vm_rollup_result_cache_full_hits_total`)
	rollupResultCachePartialHits = metrics.NewCounter(`vm_rollup_result_cache_partial_hits_total`)
	rollupResultCacheMiss        = metrics.NewCounter(`vm_rollup_result_cache_miss_total`)

	memoryIntensiveQueries = metrics.NewCounter(`vm_memory_intensive_queries_total`)
)

func evalRollupFuncWithMetricExpr(qt *querytracer.Tracer, ec *EvalConfig, funcName string, rf rollupFunc,
	expr metricsql.Expr, me *metricsql.MetricExpr, iafc *incrementalAggrFuncContext, windowExpr *metricsql.DurationExpr) ([]*timeseries, error) {
	window, err := windowExpr.NonNegativeDuration(ec.Step)
	if err != nil {
		return nil, fmt.Errorf("cannot parse lookbehind window in square brackets at %s: %w", expr.AppendString(nil), err)
	}
	if me.IsEmpty() {
		return evalNumber(ec, nan), nil
	}

	if ec.Start == ec.End {
		rvs, err := evalInstantRollup(qt, ec, funcName, rf, expr, me, iafc, window)
		if err != nil {
			err = &httpserver.UserReadableError{
				Err: err,
			}
			return nil, err
		}
		return rvs, nil
	}
	pointsPerSeries := 1 + (ec.End-ec.Start)/ec.Step
	evalWithConfig := func(ec *EvalConfig) ([]*timeseries, error) {
		tss, err := evalRollupFuncNoCache(qt, ec, funcName, rf, expr, me, iafc, window, pointsPerSeries)
		if err != nil {
			err = &httpserver.UserReadableError{
				Err: err,
			}
			return nil, err
		}
		return tss, nil
	}
	if !ec.mayCache() {
		qt.Printf("do not fetch series from cache, since it is disabled in the current context")
		return evalWithConfig(ec)
	}

	// Search for cached results.
	tssCached, start := rollupResultCacheV.GetSeries(qt, ec, expr, window)
	ec.QueryStats.addSeriesFetched(len(tssCached))
	if start > ec.End {
		qt.Printf("the result is fully cached")
		rollupResultCacheFullHits.Inc()
		return tssCached, nil
	}
	if start > ec.Start {
		qt.Printf("partial cache hit")
		rollupResultCachePartialHits.Inc()
	} else {
		qt.Printf("cache miss")
		rollupResultCacheMiss.Inc()
	}

	// Fetch missing results, which aren't cached yet.
	ecNew := ec
	if start != ec.Start {
		ecNew = copyEvalConfig(ec)
		ecNew.Start = start
	}
	// call to evalWithConfig also updates QueryStats.addSeriesFetched
	// without checking whether tss has intersection with tssCached.
	// So final number could be bigger than actual number of unique series.
	// This discrepancy is acceptable, since seriesFetched stat is used as info only.
	tss, err := evalWithConfig(ecNew)
	if err != nil {
		return nil, err
	}

	// Merge cached results with the fetched additional results.
	rvs, ok := mergeSeries(qt, tssCached, tss, start, ec)
	if !ok {
		// Cannot merge series - fall back to non-cached querying.
		qt.Printf("fall back to non-caching querying")
		rvs, err = evalWithConfig(ec)
		if err != nil {
			return nil, err
		}
	}
	rollupResultCacheV.PutSeries(qt, ec, expr, window, rvs)
	return rvs, nil
}

// evalRollupFuncNoCache calculates the given rf with the given lookbehind window.
//
// pointsPerSeries is used only for estimating the needed memory for query processing
func evalRollupFuncNoCache(qt *querytracer.Tracer, ec *EvalConfig, funcName string, rf rollupFunc,
	expr metricsql.Expr, me *metricsql.MetricExpr, iafc *incrementalAggrFuncContext, window, pointsPerSeries int64) ([]*timeseries, error) {
	if qt.Enabled() {
		qt = qt.NewChild("rollup %s: timeRange=%s, step=%d, window=%d", expr.AppendString(nil), ec.timeRangeString(), ec.Step, window)
		defer qt.Done()
	}
	if window < 0 {
		return nil, nil
	}
	// Obtain rollup configs before fetching data from db, so type errors could be caught earlier.
	sharedTimestamps := getTimestamps(ec.Start, ec.End, ec.Step, ec.MaxPointsPerSeries)
	preFunc, rcs, err := getRollupConfigs(funcName, rf, expr, ec.Start, ec.End, ec.Step, ec.MaxPointsPerSeries, window, ec.LookbackDelta, sharedTimestamps)
	if err != nil {
		return nil, err
	}

	// Fetch the result.
	tfss := searchutil.ToTagFilterss(me.LabelFilterss)
	tfss = searchutil.JoinTagFilterss(tfss, ec.EnforcedTagFilterss)
	minTimestamp := ec.Start
	if needSilenceIntervalForRollupFunc[funcName] {
		minTimestamp -= maxSilenceInterval()
	}
	if window > ec.Step {
		minTimestamp -= window
	} else {
		minTimestamp -= ec.Step
	}
	sq := storage.NewSearchQuery(minTimestamp, ec.End, tfss, ec.MaxSeries)
	rss, err := netstorage.ProcessSearchQuery(qt, sq, ec.Deadline)
	if err != nil {
		return nil, err
	}
	qs := ec.QueryStats
	rssLen := rss.Len()
	if rssLen == 0 {
		rss.Cancel()
		return nil, nil
	}
	qs.addSeriesFetched(rssLen)

	// Verify timeseries fit available memory during rollup calculations.
	timeseriesLen := rssLen
	if iafc != nil {
		// Incremental aggregates require holding only GOMAXPROCS timeseries in memory.
		timeseriesLen = cgroup.AvailableCPUs()
		if iafc.ae.Modifier.Op != "" {
			if iafc.ae.Limit > 0 {
				// There is an explicit limit on the number of output time series.
				timeseriesLen *= iafc.ae.Limit
			} else {
				// Increase the number of timeseries for non-empty group list: `aggr() by (something)`,
				// since each group can have own set of time series in memory.
				timeseriesLen *= 1000
			}
		}
		// The maximum number of output time series is limited by rssLen.
		if timeseriesLen > rssLen {
			timeseriesLen = rssLen
		}
	}
	rollupPoints := mulNoOverflow(pointsPerSeries, int64(timeseriesLen*len(rcs)))
	rollupMemorySize := sumNoOverflow(mulNoOverflow(int64(timeseriesLen), 1000), mulNoOverflow(rollupPoints, 16))
	if maxMemory := int64(logQueryMemoryUsage.N); maxMemory > 0 && rollupMemorySize > maxMemory {
		memoryIntensiveQueries.Inc()
		requestURI := ec.GetRequestURI()
		logger.Warnf("remoteAddr=%s, requestURI=%s: the %s requires %d bytes of memory for processing; "+
			"logging this query, since it exceeds the -search.logQueryMemoryUsage=%d; "+
			"the query selects %d time series and generates %d points across all the time series; try reducing the number of selected time series",
			ec.QuotedRemoteAddr, requestURI, expr.AppendString(nil), rollupMemorySize, maxMemory, timeseriesLen*len(rcs), rollupPoints)
	}
	if maxMemory := int64(maxMemoryPerQuery.N); maxMemory > 0 && rollupMemorySize > maxMemory {
		rss.Cancel()
		err := fmt.Errorf("not enough memory for processing %s, which returns %d data points across %d time series with %d points in each time series "+
			"according to -search.maxMemoryPerQuery=%d; requested memory: %d bytes; "+
			"possible solutions are: reducing the number of matching time series; increasing `step` query arg (step=%gs); "+
			"increasing -search.maxMemoryPerQuery",
			expr.AppendString(nil), rollupPoints, timeseriesLen*len(rcs), pointsPerSeries, maxMemory, rollupMemorySize, float64(ec.Step)/1e3)
		return nil, err
	}
	rml := getRollupMemoryLimiter()
	if !rml.Get(uint64(rollupMemorySize)) {
		rss.Cancel()
		err := fmt.Errorf("not enough memory for processing %s, which returns %d data points across %d time series with %d points in each time series; "+
			"total available memory for concurrent requests: %d bytes; requested memory: %d bytes; "+
			"possible solutions are: reducing the number of matching time series; increasing `step` query arg (step=%gs); "+
			"switching to node with more RAM; increasing -memory.allowedPercent",
			expr.AppendString(nil), rollupPoints, timeseriesLen*len(rcs), pointsPerSeries, rml.MaxSize, uint64(rollupMemorySize), float64(ec.Step)/1e3)
		return nil, err
	}
	defer rml.Put(uint64(rollupMemorySize))
	qt.Printf("the rollup evaluation needs an estimated %d bytes of RAM for %d series and %d points per series (summary %d points)",
		rollupMemorySize, timeseriesLen, pointsPerSeries, rollupPoints)

	// Evaluate rollup
	keepMetricNames := getKeepMetricNames(expr)
	if iafc != nil {
		return evalRollupWithIncrementalAggregate(qt, funcName, keepMetricNames, iafc, rss, rcs, preFunc, sharedTimestamps)
	}
	return evalRollupNoIncrementalAggregate(qt, funcName, keepMetricNames, rss, rcs, preFunc, sharedTimestamps)
}

var (
	rollupMemoryLimiter     memoryLimiter
	rollupMemoryLimiterOnce sync.Once
)

func getRollupMemoryLimiter() *memoryLimiter {
	rollupMemoryLimiterOnce.Do(func() {
		rollupMemoryLimiter.MaxSize = uint64(memory.Allowed()) / 4
	})
	return &rollupMemoryLimiter
}

func maxSilenceInterval() int64 {
	d := minStalenessInterval.Milliseconds()
	if d <= 0 {
		d = 5 * 60 * 1000
	}
	return d
}

func evalRollupWithIncrementalAggregate(qt *querytracer.Tracer, funcName string, keepMetricNames bool,
	iafc *incrementalAggrFuncContext, rss *netstorage.Results, rcs []*rollupConfig,
	preFunc func(values []float64, timestamps []int64), sharedTimestamps []int64) ([]*timeseries, error) {
	qt = qt.NewChild("rollup %s() with incremental aggregation %s() over %d series; rollupConfigs=%s", funcName, iafc.ae.Name, rss.Len(), rcs)
	defer qt.Done()
	var samplesScannedTotal atomic.Uint64
	err := rss.RunParallel(qt, func(rs *netstorage.Result, workerID uint) error {
		rs.Values, rs.Timestamps = dropStaleNaNs(funcName, rs.Values, rs.Timestamps)
		preFunc(rs.Values, rs.Timestamps)
		ts := getTimeseries()
		defer putTimeseries(ts)
		for _, rc := range rcs {
			if tsm := newTimeseriesMap(funcName, keepMetricNames, sharedTimestamps, &rs.MetricName); tsm != nil {
				samplesScanned := rc.DoTimeseriesMap(tsm, rs.Values, rs.Timestamps)
				for _, ts := range tsm.m {
					iafc.updateTimeseries(ts, workerID)
				}
				samplesScannedTotal.Add(samplesScanned)
				continue
			}
			ts.Reset()
			samplesScanned := doRollupForTimeseries(funcName, keepMetricNames, rc, ts, &rs.MetricName, rs.Values, rs.Timestamps, sharedTimestamps)
			samplesScannedTotal.Add(samplesScanned)
			iafc.updateTimeseries(ts, workerID)

			// ts.Timestamps points to sharedTimestamps. Zero it, so it can be reused.
			ts.Timestamps = nil
			ts.denyReuse = false
		}
		return nil
	})
	if err != nil {
		return nil, err
	}
	tss := iafc.finalizeTimeseries()
	rowsScannedPerQuery.Update(float64(samplesScannedTotal.Load()))
	qt.Printf("series after aggregation with %s(): %d; samplesScanned=%d", iafc.ae.Name, len(tss), samplesScannedTotal.Load())
	return tss, nil
}

func evalRollupNoIncrementalAggregate(qt *querytracer.Tracer, funcName string, keepMetricNames bool, rss *netstorage.Results, rcs []*rollupConfig,
	preFunc func(values []float64, timestamps []int64), sharedTimestamps []int64) ([]*timeseries, error) {
	qt = qt.NewChild("rollup %s() over %d series; rollupConfigs=%s", funcName, rss.Len(), rcs)
	defer qt.Done()

	var samplesScannedTotal atomic.Uint64
	tsw := getTimeseriesByWorkerID()
	seriesByWorkerID := tsw.byWorkerID
	seriesLen := rss.Len()
	err := rss.RunParallel(qt, func(rs *netstorage.Result, workerID uint) error {
		rs.Values, rs.Timestamps = dropStaleNaNs(funcName, rs.Values, rs.Timestamps)
		preFunc(rs.Values, rs.Timestamps)
		for _, rc := range rcs {
			if tsm := newTimeseriesMap(funcName, keepMetricNames, sharedTimestamps, &rs.MetricName); tsm != nil {
				samplesScanned := rc.DoTimeseriesMap(tsm, rs.Values, rs.Timestamps)
				samplesScannedTotal.Add(samplesScanned)
				seriesByWorkerID[workerID].tss = tsm.AppendTimeseriesTo(seriesByWorkerID[workerID].tss)
				continue
			}
			var ts timeseries
			samplesScanned := doRollupForTimeseries(funcName, keepMetricNames, rc, &ts, &rs.MetricName, rs.Values, rs.Timestamps, sharedTimestamps)
			samplesScannedTotal.Add(samplesScanned)
			seriesByWorkerID[workerID].tss = append(seriesByWorkerID[workerID].tss, &ts)
		}
		return nil
	})
	if err != nil {
		return nil, err
	}
	tss := make([]*timeseries, 0, seriesLen*len(rcs))
	for i := range seriesByWorkerID {
		tss = append(tss, seriesByWorkerID[i].tss...)
	}
	putTimeseriesByWorkerID(tsw)

	rowsScannedPerQuery.Update(float64(samplesScannedTotal.Load()))
	qt.Printf("samplesScanned=%d", samplesScannedTotal.Load())
	return tss, nil
}

func doRollupForTimeseries(funcName string, keepMetricNames bool, rc *rollupConfig, tsDst *timeseries, mnSrc *storage.MetricName,
	valuesSrc []float64, timestampsSrc []int64, sharedTimestamps []int64) uint64 {
	tsDst.MetricName.CopyFrom(mnSrc)
	if len(rc.TagValue) > 0 {
		tsDst.MetricName.AddTag("rollup", rc.TagValue)
	}
	if !keepMetricNames && !rollupFuncsKeepMetricName[funcName] {
		tsDst.MetricName.ResetMetricGroup()
	}
	var samplesScanned uint64
	tsDst.Values, samplesScanned = rc.Do(tsDst.Values[:0], valuesSrc, timestampsSrc)
	tsDst.Timestamps = sharedTimestamps
	tsDst.denyReuse = true
	return samplesScanned
}

type timeseriesWithPadding struct {
	tss []*timeseries

	// The padding prevents false sharing on widespread platforms with
	// 128 mod (cache line size) = 0 .
	_ [128 - unsafe.Sizeof([]*timeseries{})%128]byte
}

type timeseriesByWorkerID struct {
	byWorkerID []timeseriesWithPadding
}

func (tsw *timeseriesByWorkerID) reset() {
	byWorkerID := tsw.byWorkerID
	for i := range byWorkerID {
		byWorkerID[i].tss = nil
	}
}

func getTimeseriesByWorkerID() *timeseriesByWorkerID {
	v := timeseriesByWorkerIDPool.Get()
	if v == nil {
		return &timeseriesByWorkerID{
			byWorkerID: make([]timeseriesWithPadding, netstorage.MaxWorkers()),
		}
	}
	return v.(*timeseriesByWorkerID)
}

func putTimeseriesByWorkerID(tsw *timeseriesByWorkerID) {
	tsw.reset()
	timeseriesByWorkerIDPool.Put(tsw)
}

var timeseriesByWorkerIDPool sync.Pool

var bbPool bytesutil.ByteBufferPool

func evalNumber(ec *EvalConfig, n float64) []*timeseries {
	var ts timeseries
	ts.denyReuse = true
	timestamps := ec.getSharedTimestamps()
	values := make([]float64, len(timestamps))
	for i := range timestamps {
		values[i] = n
	}
	ts.Values = values
	ts.Timestamps = timestamps
	return []*timeseries{&ts}
}

func evalString(ec *EvalConfig, s string) []*timeseries {
	rv := evalNumber(ec, nan)
	rv[0].MetricName.MetricGroup = append(rv[0].MetricName.MetricGroup[:0], s...)
	return rv
}

func evalTime(ec *EvalConfig) []*timeseries {
	rv := evalNumber(ec, nan)
	timestamps := rv[0].Timestamps
	values := rv[0].Values
	for i, ts := range timestamps {
		values[i] = float64(ts) / 1e3
	}
	return rv
}

func mulNoOverflow(a, b int64) int64 {
	if math.MaxInt64/b < a {
		// Overflow
		return math.MaxInt64
	}
	return a * b
}

func sumNoOverflow(a, b int64) int64 {
	if math.MaxInt64-a < b {
		// Overflow
		return math.MaxInt64
	}
	return a + b
}

func dropStaleNaNs(funcName string, values []float64, timestamps []int64) ([]float64, []int64) {
	if *noStaleMarkers || funcName == "default_rollup" || funcName == "stale_samples_over_time" {
		// Do not drop Prometheus staleness marks (aka stale NaNs) for default_rollup() function,
		// since it uses them for Prometheus-style staleness detection.
		// Do not drop staleness marks for stale_samples_over_time() function, since it needs
		// to calculate the number of staleness markers.
		return values, timestamps
	}
	// Remove Prometheus staleness marks, so non-default rollup functions don't hit NaN values.
	hasStaleSamples := false
	for _, v := range values {
		if decimal.IsStaleNaN(v) {
			hasStaleSamples = true
			break
		}
	}
	if !hasStaleSamples {
		// Fast path: values have no Prometheus staleness marks.
		return values, timestamps
	}
	// Slow path: drop Prometheus staleness marks from values.
	dstValues := values[:0]
	dstTimestamps := timestamps[:0]
	for i, v := range values {
		if decimal.IsStaleNaN(v) {
			continue
		}
		dstValues = append(dstValues, v)
		dstTimestamps = append(dstTimestamps, timestamps[i])
	}
	return dstValues, dstTimestamps
}
